/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;

import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.hadoop.yarn.server.nodemanager.DeletionService;

/**
 * A class responsible for cleaning the PUBLIC and PRIVATE local caches on a
 * node manager.
 */
class LocalCacheCleaner {

  private long currentSize;
  private final long targetSize;
  private final DeletionService delService;
  private final SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap;

  LocalCacheCleaner(DeletionService delService, long targetSize) {
    this(delService, targetSize, new LRUComparator());
  }

  LocalCacheCleaner(DeletionService delService, long targetSize,
      Comparator<? super LocalizedResource> cmp) {
    this(delService, targetSize,
        new TreeMap<LocalizedResource, LocalResourcesTracker>(cmp));
  }

  LocalCacheCleaner(DeletionService delService, long targetSize,
      SortedMap<LocalizedResource, LocalResourcesTracker> resourceMap) {
    this.resourceMap = resourceMap;
    this.delService = delService;
    this.targetSize = targetSize;
  }

  /**
   * Adds resources from the passed LocalResourceTracker that are candidates for
   * deletion from the cache.
   *
   * @param newTracker add all resources being tracked by the passed
   *          LocalResourcesTracker to the LocalCacheCleaner.
   */
  public void addResources(LocalResourcesTracker newTracker) {
    for (LocalizedResource resource : newTracker) {
      currentSize += resource.getSize();
      if (resource.getRefCount() > 0) {
        // Do not delete resources that are still in use
        continue;
      }
      resourceMap.put(resource, newTracker);
    }
  }

  /**
   * Delete resources from the cache in the sorted order generated by the
   * Comparator used to construct this class.
   *
   * @return stats about what was cleaned up during this call of cleanCache
   */
  public LocalCacheCleanerStats cleanCache() {
    LocalCacheCleanerStats stats = new LocalCacheCleanerStats(currentSize);
    for (Iterator<Map.Entry<LocalizedResource, LocalResourcesTracker>> i =
        resourceMap.entrySet().iterator();
        currentSize - stats.totalDelSize > targetSize && i.hasNext();) {
      Map.Entry<LocalizedResource, LocalResourcesTracker> rsrc = i.next();
      LocalizedResource resource = rsrc.getKey();
      LocalResourcesTracker tracker = rsrc.getValue();
      if (tracker.remove(resource, delService)) {
        stats.incDelSize(tracker.getUser(), resource.getSize());
      }
    }
    this.resourceMap.clear();
    return stats;
  }

  static class LocalCacheCleanerStats {
    private final Map<String, Long> userDelSizes = new TreeMap<String, Long>();
    private final long cacheSizeBeforeClean;
    private long totalDelSize;
    private long publicDelSize;
    private long privateDelSize;

    LocalCacheCleanerStats(long cacheSizeBeforeClean) {
      this.cacheSizeBeforeClean = cacheSizeBeforeClean;
    }

    void incDelSize(String user, long delSize) {
      totalDelSize += delSize;
      if (user == null) {
        publicDelSize += delSize;
      } else {
        privateDelSize += delSize;
        Long userDel = userDelSizes.get(user);
        if (userDel != null) {
          userDel += delSize;
          userDelSizes.put(user, userDel);
        } else {
          userDelSizes.put(user, delSize);
        }
      }
    }

    Map<String, Long> getUserDelSizes() {
      return Collections.unmodifiableMap(userDelSizes);
    }

    long getCacheSizeBeforeClean() {
      return cacheSizeBeforeClean;
    }

    long getTotalDelSize() {
      return totalDelSize;
    }

    long getPublicDelSize() {
      return publicDelSize;
    }

    long getPrivateDelSize() {
      return privateDelSize;
    }

    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append("Cache Size Before Clean: ").append(cacheSizeBeforeClean)
          .append(", ");
      sb.append("Total Deleted: ").append(totalDelSize).append(", ");
      sb.append("Public Deleted: ").append(publicDelSize).append(", ");
      sb.append("Private Deleted: ").append(privateDelSize);
      return sb.toString();
    }

    public String toStringDetailed() {
      StringBuilder sb = new StringBuilder();
      sb.append(this.toString());
      sb.append(", Private Deleted Detail: {");
      for (Map.Entry<String, Long> e : userDelSizes.entrySet()) {
        sb.append(" ").append(e.getKey()).append(":").append(e.getValue());
      }
      sb.append(" }");
      return sb.toString();
    }
  }

  private static class LRUComparator implements Comparator<LocalizedResource>,
      Serializable {

    private static final long serialVersionUID = 7034380228434701685L;

    public int compare(LocalizedResource r1, LocalizedResource r2) {
      long ret = r1.getTimestamp() - r2.getTimestamp();
      if (0 == ret) {
        return System.identityHashCode(r1) - System.identityHashCode(r2);
      }
      return ret > 0 ? 1 : -1;
    }
  }
}
